1

I get this error FileNotFoundError: [Errno 2] No such file or directory when I try to write a csv file to the bucket, using a csv writer that loops over batches of data. The full insight into the Cloud Function logs around that error:


File "/workspace/main.py", line 299, in write_to_csv_file with
open(filepath, "w") as outcsv: FileNotFoundError: [Errno 2] No such
file or directory: 'gs://MY_BUCKET/MY_CSV.csv'

Function execution took 52655 ms, finished with status: 'crash' 

OpenBLAS WARNING - could not determine the L2 cache size on this
system, assuming 256k  ```

And this, although this bucket_filepath definitely exists: I can upload an empty dummy file and get its "gsutils URI" (right-click on the three dots at the right side of the file) and the bucket_filepath will look the same: 'gs://MY_BUCKET/MY_CSV.csv'.

I checked saving a dummy pandas dataframe instead using pd.to_csv and it worked with the same bucket_filepath (!).

Therefore, there must be another reason, likely the writer is not accepted, or the with statement that opens the file.

The code that throws the error is as follows. It is with the same code working outside of Google Cloud Function in a normal cron job on a local server. I have added two debug prints around the line that throws the error, the print("Right after opening the file ...") does not show up anymore. The subfunction query_execute_batch() that write_to_csv_file() is calling for each batch is also shown but likely not the problem here since the error happens already at the very start when write-opening the csv file.

requirements.txt (which are then imported as modules):

SQLAlchemy>=1.4.2
google-cloud-storage>=1.16.1
mysqlclient==2.1.0
pandas==1.2.3
fsspec==2021.11.1
gcsfs==2021.11.1
unicodecsv==0.14.1

And from the main.py:

def query_execute_batch(connection):
    """Function for reading data from the query result into batches
    :yield: each result in a loop is a batch of the query result
    """
    results = execute_select_batch(connection, SQL_QUERY)
    print(f"len(results): {len(results)}")
    for result in results:
        yield result

def write_to_csv_file(connection, filepath):
    """Write the data in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    returns: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")    
    with open(filepath, "w") as outcsv:
        print("Right after opening the file ...")        
        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        writer.writeheader()

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Mind that for the above script to work, I import gcsfs which makes the bucket read-write-available. Else I would likely need a google cloud storage object like for example:

storage_client = storage.Client()
bucket = storage_client.bucket(BUCKET_NAME)

and then make the file in that bucket with further functions, but that is not the aim here.

In the following, the pd.to_csv code that works, it uses the output of a dummy SQL query SELECT 1 as the input of a dataframe. This can be saved to the same bucket_filepath, of course the reason might not just be pd.to_csv() as such, but also that the dataset is a dummy instead of complex unicode strings from a huge SELECT query. Or there is another reason, I am just guessing.

if records is not None:
    df = pd.DataFrame(records.fetchall())
    df.columns = records.keys()
    df.to_csv(filepath,
        index=False,
    )
    datetime_now_save = datetime.now()
    countrows = df.shape[0]

I would like to use the csv writer to have the chance to write in unicode with the unicodecsv module and the chance to use the batches.

I might be willing to change to batches (loop + append mode or chunksize) in pandas like in Writing large Pandas Dataframes to CSV file in chunks to get rid of this bucket filepath problem, but I would like to rather use the ready code (never touch a running system).

How can I get the saving of that csv done with the csv writer so that it can open a new file in the bucket in write mode = with open(filepath, "w") as outcsv:?

The given function write_to_csv_file() is just a tiny part of the Cloud Function which uses a wide range of functions and cascaded functions. I cannot show the whole reproducible case here and hope that it can be answered by experience or easier examples.

1 Answers1

1

The solution is surprising. You must import and use the gcsfs module if you want to write to a file with open().

If you use pd.to_csv(), import gcsfs is not needed, but gcsfs is still needed in the requirements.txt to make pd.to_csv() work, thus, pandas to_csv() seems to use it automatically.

The pd.to_csv() surprise put aside, here is the code that answers the question (tested):

def write_to_csv_file(connection, filepath):
    """Write the QUERY result in a loop over batches into a csv.
    This is done in batches since the query from the database is huge.
    :param connection: mysqldb connection to DB
    :param filepath: path to csv file to write data
    return: metadata on rows and time
    """
    countrows = 0
    print("Right before opening the file ...")
   

    # A gcsfs object is needed to open a file.
    # https://stackoverflow.com/questions/52805016/how-to-open-a-file-from-google-cloud-storage-into-a-cloud-function
    # https://gcsfs.readthedocs.io/en/latest/index.html#examples
    # Side-note (Exception):
    # pd.to_csv() needs neither the gcsfs object, nor its import.
    # It is not used here, but it has been tested with examples.
    fs = gcsfs.GCSFileSystem(project=MY_PROJECT)
    fs.ls(BUCKET_NAME)


    # wb needed, else "builtins.TypeError: must be str, not bytes"
    # https://stackoverflow.com/questions/5512811/builtins-typeerror-must-be-str-not-bytes
    with fs.open(filepath, 'wb') as outcsv:
        print("Right after opening the file ...")

        writer = csv.DictWriter(
            outcsv,
            fieldnames=FIELDNAMES,
            extrasaction="ignore",
            delimiter="|",
            lineterminator="\n",
        )
        # write header according to fieldnames
        print("before writer.writeheader()")
        writer.writeheader()
        print("after writer.writeheader()")

        for batch in query_execute_batch(connection):
            writer.writerows(batch)
            countrows += len(batch)
        datetime_now_save = datetime.now()
    return countrows, datetime_now_save

Side-note

Do not use the csv writer like this.

It takes too long, instead of the pd.to_csv() with a chunksize parameter of 5000 which needs just 62s for the 700k rows to be loaded and stored as a csv in the bucket, the CF with the writer of batches takes more than the 9 minutes which is over the timeout limit. I am therefore forced to use pd.to_csv() instead and convert my data into a dataframe for that.